Skip to content

feat: add Celery integration and improve PostHog client fork safety#464

Open
parinporecha wants to merge 3 commits intoPostHog:masterfrom
parinporecha:feat/celery_integration
Open

feat: add Celery integration and improve PostHog client fork safety#464
parinporecha wants to merge 3 commits intoPostHog:masterfrom
parinporecha:feat/celery_integration

Conversation

@parinporecha
Copy link

@parinporecha parinporecha commented Mar 13, 2026

Summary

This PR:

  • Adds a new PosthogCeleryIntegration to automatically capture Celery task lifecycle events and exceptions.
  • Propagates PostHog context (distinct_id, session_id, tags) from the task producer to the worker so Celery tasks can be correlated with the originating user/session.
  • Makes Client safer across process forks by reinitializing async consumers in child processes.
from posthog.integrations.celery import PosthogCeleryIntegration
integration = PosthogCeleryIntegration(
    client=client,
    capture_exceptions=True,
    capture_task_lifecycle_events=True,
    propagate_context=True
)
integration.instrument()

Context

I saw users asking for advice on how to use PostHog with Celery for error tracking in community questions and realized that there's currently no first-class way to instrument Celery workloads with PostHog.

That leaves a few gaps:

  • background task execution is hard to observe without manual instrumentation.
  • worker-side events are difficult to correlate back to the originating user or request.

This PR addresses those gaps by adding a Celery integration that helps users observe task execution end-to-end out of the box.

The integration takes inspiration from OpenTelemetry's Celery instrumentor and PostHog context propagation is achieved through task headers mirroring Sentry and DataDog's Celery integrations.

While testing this, I found a separate SDK issue: when a Client configured in async mode is inherited across a process fork, the child process can inherit a client whose consumer threads no longer exist. In practice, that means worker-side events may not be delivered reliably.

To make it safer, this PR also adds fork handling to Client by reinitializing its queue and consumers in the child process via os.register_at_fork. That said, the recommended setup for Celery remains to initialize a fresh Client and instrument PosthogCeleryIntegration inside each worker process, as shown in the example.

Changes

New: Celery Integration (posthog/integrations/celery.py)

  • Lifecycle Events: Hooks into Celery signals (task_prerun, task_success, task_failure, etc.) to capture events like celery task started, celery task success etc. Check the docstring in the integration module code for complete list of supported events.
    • Lifecycle events include Celery-specific properties such as task ID, task name, queue, retry count, duration, Celery version etc. Check the docstring for complete set of event properties.
  • Context Propagation:
    • _on_before_task_publish: Injects current PostHog context (distinct_id, session_id, tags) into task headers.
    • _on_task_prerun: Extracts headers in the worker and restores the PostHog context for the duration of the task. This context is exited upon task completion.
    • Any custom events captured inside a task inherit the same propagated PostHog context and Celery task tags.
  • Exception Capture: Automatically captures exceptions from failed tasks.

Refactored: Client Fork Safety posthog/client.py

  • Added _reinit_after_fork method to reset the internal queue and spin up new consumers in a child process.
  • Uses os.register_at_fork (on supported platforms) to automatically call this method, so that the SDK does not drop captured events when used in child processes.

Examples examples/celery_integration.py

  • Added a complete example showing how to configure the integration on both the producer and worker sides and all features in practice.

Tests

  • Added posthog/test/integrations/test_celery_integration.py covering:
    • Signal handlers for all task states.
    • Context propagation (header injection and extraction).
    • Task filtering logic.
    • Exception capture.
  • Added tests in posthog/test/test_client.py for _reinit_after_fork to verify consumers are restarted correctly.
  • Manually tested the example against Celery 5.2.1 (2021), 5.3.1, 5.4.0, and 5.6.2 (2026).

Screenshots (created through example script)

  • Celery task lifecycle events and captured Exception -
    image

  • Celery task success event emitted from worker carrying correct distinct ID, session ID set in parent and context tags -
    image

  • Captured exception -
    image

@parinporecha parinporecha force-pushed the feat/celery_integration branch 6 times, most recently from 756a6a2 to e8eff44 Compare March 14, 2026 23:01
@parinporecha parinporecha force-pushed the feat/celery_integration branch from e8eff44 to 896dfa8 Compare March 14, 2026 23:11
@parinporecha parinporecha changed the title feat: add Celery integration & add fork safety to PostHog client feat: add Celery integration and improve PostHog client fork safety Mar 15, 2026
@parinporecha parinporecha marked this pull request as ready for review March 15, 2026 10:42
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 15, 2026

Prompt To Fix All With AI
This is a comment left during a code review.
Path: posthog/integrations/celery.py
Line: 225-248

Comment:
**Context leak if `_on_task_prerun` raises after entering context**

If any code between line 233 (`context_manager.__enter__()`) and the end of the try block throws an exception, the `except` on line 247 swallows it but the context is never exited — it remains pushed onto the `contextvars` stack for the remainder of the thread's life. This corrupts context state for subsequent tasks in the same worker thread.

For example, if `self._apply_propagated_identity(request)` (line 239) or `self._capture_event(...)` (line 246) raises, the context will leak. Similarly, if `request` is `None`, the context manager is entered but never stored on `request._posthog_ctx`, so `_handle_task_end`'s finally block can never exit it.

Consider cleaning up the context in the `except` block:

```python
    def _on_task_prerun(self, *args, **kwargs):
        context_manager = None
        try:
            task_id = kwargs.get("task_id")
            if not task_id:
                return

            sender = kwargs.get("sender")
            request = getattr(sender, "request", None)
            context_tags = self._extract_propagated_tags(request)
            task_properties = self._build_task_properties(
                sender=sender,
                task_id=task_id,
                state="started",
            )
            task_name = task_properties.get("celery_task_name")

            context_manager = contexts.new_context(
                fresh=True,
                capture_exceptions=False,
                client=self.client,
            )
            context_manager.__enter__()

            if request is not None:
                request._posthog_ctx = context_manager
                request._posthog_start = time.monotonic()

            self._apply_propagated_identity(request)

            merged_tags = {**task_properties, **context_tags}
            for key, value in merged_tags.items():
                contexts.tag(key, value)

            if self.capture_task_lifecycle_events and self._should_track(task_name, task_properties):
                self._capture_event("celery task started", properties=task_properties)
        except Exception:
            logger.exception("Failed to process Celery task_prerun")
            if context_manager is not None:
                try:
                    context_manager.__exit__(None, None, None)
                except Exception:
                    pass
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: posthog/client.py
Line: 336-337

Comment:
**`register_at_fork` prevents Client garbage collection**

`os.register_at_fork` callbacks cannot be unregistered. The bound method `self._reinit_after_fork` holds a strong reference to `self`, which means:

1. Every `Client` instance registered here will never be garbage collected for the lifetime of the process.
2. If multiple `Client` instances are created (e.g., in tests, or per-request patterns), each fork will run *all* accumulated callbacks, including for defunct/shutdown clients.

A common mitigation is to use a `weakref` callback so the client can still be collected:

```python
import weakref

# in __init__, replace the current register_at_fork block with:
if hasattr(os, "register_at_fork"):
    weak_self = weakref.ref(self)
    def _reinit_child():
        client = weak_self()
        if client is not None:
            client._reinit_after_fork()
    os.register_at_fork(after_in_child=_reinit_child)
```

This way, if the Client is no longer referenced, the callback becomes a no-op rather than keeping the entire Client alive.

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: 84b314c

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant